//package damo;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONObject;
//
//import com.bw.gmall.realtime.common.constant.Constant;
//import com.bw.gmall.realtime.common.util.HbaseUtil;
//import com.bw.gmall.realtime.common.util.RedisUtil;
//import io.lettuce.core.RedisFuture;
//import io.lettuce.core.api.StatefulRedisConnection;
//import org.apache.flink.configuration.Configuration;
//import org.apache.flink.streaming.api.functions.async.ResultFuture;
//import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
//import org.apache.hadoop.hbase.client.AsyncConnection;
//
//import java.io.IOException;
//import java.util.Collections;
//import java.util.concurrent.CompletableFuture;
//import java.util.concurrent.ExecutionException;
//import java.util.function.Consumer;
//import java.util.function.Function;
//import java.util.function.Supplier;
//
//public abstract class  DimAsyncFunction <T>extends RichAsyncFunction<T, T> implements IDimAsync<T> {
//    private StatefulRedisConnection<String, String> redisAsyncConnection;
//    private AsyncConnection hBaseAsyncConnection;
//    String table;
//    public DimAsyncFunction(String table) {
//        this.table = table;
//    }
//
//    @Override
//    public void open(Configuration parameters) throws Exception {
//        hBaseAsyncConnection = HbaseUtil.getHBaseAsyncConnection();
//        redisAsyncConnection = RedisUtil.getRedisAsyncConnection();
//    }
//
//    @Override
//    public void close() throws Exception {
//        HbaseUtil.closeAsyncConnection(hBaseAsyncConnection);
//        RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);
//    }
//    @Override
//    public void asyncInvoke(T t, ResultFuture<T> resultFuture) throws Exception {
//        String dimId = getDimId(t);
//        String redisKey = RedisUtil.getRedisKey(table, dimId);
//        //先查redis
//        CompletableFuture.supplyAsync(new Supplier<String>() {
//            @Override
//            public String get() {
//                String dimInfo = null;
//                RedisFuture<String> stringRedisFuture = redisAsyncConnection.async().get(redisKey);
//                try {
//                    dimInfo = stringRedisFuture.get();
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                } catch (ExecutionException e) {
//                    e.printStackTrace();
//                }
//                return dimInfo;
//            }
//        }).thenApplyAsync(new Function<String, JSONObject>() {
//            @Override
//            public JSONObject apply(String dimInfo) {
//                JSONObject jsonObject = null;
//                // redis没有该值
//                if (dimInfo == null || dimInfo.length() == 0) {
//                    try {
//                        jsonObject = HbaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, table, dimId);
//                        // 放到redis,进行缓存
//                        redisAsyncConnection.async().setex(redisKey, 24 * 3600, jsonObject.toJSONString());
//                    } catch (IOException e) {
//                        e.printStackTrace();
//                    }
//                }else{
//                    jsonObject = JSON.parseObject(dimInfo);
//                }
//                return jsonObject;
//            }
//        }).thenAccept(new Consumer<JSONObject>() {
//            @Override
//            public void accept(JSONObject dimInfo) {
//                if (dimInfo != null) {
//                    setDim(t,dimInfo);
//                } else {
//                    System.out.println("没有查到维度数据:" + table + ":" + dimId);
//                }
//                // 返回结果
//                resultFuture.complete(Collections.singletonList(t));
//            }
//        });
//    }
//
//
//}
